10 限流降级与流量效果控制器(上)

从这篇开始,我们学习 Sentinel 提供的几个实现降级功能的 ProcessorSlot,这些 ProcessorSlot 检查实时指标数据是否达到规则所配置的阈值,当达到阈值时,或抛出 Block 异常或采取流量效果控制策略处理超阈值的流量。

Sentinel 实现限流降级、熔断降级、黑白名单限流降级、系统自适应限流降级以及热点参数限流降级都是由 ProcessorSlot、Checker、Rule、RuleManager 组合完成。ProcessorSlot 作为调用链路的切入点,负责调用 Checker 检查当前请求是否可以放行;Checker 则根据资源名称从 RuleManager 中拿到为该资源配置的 Rule(规则),取 ClusterNode 统计的实时指标数据与规则对比,如果达到规则的阈值则抛出 Block 异常,抛出 Block 异常意味着请求被拒绝,也就实现了限流或熔断。

可以总结为以下三个步骤:

  1. 在 ProcessorSlot#entry 方法中调用 Checker#check 方法,并将 DefaultNode 传递给 Checker。
  2. Checker 从 DefaultNode 拿到 ClusterNode,并根据资源名称从 RuleManager 获取为该资源配置的规则。
  3. Checker 从 ClusterNode 中获取当前时间窗口的某项指标数据(QPS、avgRt 等)与规则的阈值对比,如果达到规则的阈值则抛出 Block 异常(也有可能将 check 交给 Rule 去实现)。

限流规则与规则配置加载器

Sentinel 在最初的框架设计上,将是否允许请求通过的判断行为交给 Rule 去实现,所以将 Rule 定义成了接口。Rule 接口只定义了一个 passCheck 方法,即判断当前请求是否允许通过。Rule 接口的定义如下:

public interface Rule {

    boolean passCheck(Context context, DefaultNode node, int count, Object... args);

}
  • context:当前调用链路上下文。
  • node:当前资源的 DefaultNode。
  • count:一般为 1,用在令牌桶算法中表示需要申请的令牌数,用在 QPS 统计中表示一个请求。
  • args:方法调用参数(被 Sentinel 拦截的目标方法),用于实现热点参数限流降级的。

因为规则是围绕资源配置的,一个规则只对某个资源起作用,因此 Sentinel 提供了一个抽象规则配置类 AbstractRule,AbstractRule 的定义如下:

public abstract class AbstractRule implements Rule {

    private String resource;

    private String limitApp;

    // ....

}
  • resource:资源名称,规则的作用对象。
  • limitApp:只对哪个或者哪些调用来源生效,若为 default 则不区分调用来源。

Rule、AbstractRule 与其它实现类的关系如下图所示:

10-01-rule

FlowRule 是限流规则配置类,FlowRule 继承 AbstractRule 并实现 Rule 接口。FlowRule 源码如下,非完整源码,与实现集群限流相关的字段暂时去掉了。

public class FlowRule extends AbstractRule {

    // 限流阈值类型 qps|threads

    private int grade = RuleConstant.FLOW_GRADE_QPS;

    // 限流阈值

    private double count;

    // 基于调用关系的限流策略

    private int strategy = RuleConstant.STRATEGY_DIRECT;

    // 配置 strategy 使用,入口资源名称

    private String refResource;

    // 流量控制效果(直接拒绝、Warm Up、匀速排队)

    private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;

    // 冷启动时长(预热时长),单位秒

    private int warmUpPeriodSec = 10;

    // 最大排队时间。

    private int maxQueueingTimeMs = 500;

    // 流量控制器

    private TrafficShapingController controller;

    //.....

    @Override

    public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {

        return true;

    }

}
  • FlowRule 的字段解析已在源码中给出注释,现在还不需要急于去理解每个字段的作用。
  • FlowRule 实现 Rule 接口方法只是返回 true,因为 passCheck 的逻辑并不由 FlowRule 实现。

Rule 定义的行为应该只是 Sentinel 在最初搭建框架时定义的约定,Sentinel 自己也并没有都遵守这个约定,很多规则并没有将 passCheck 交给 Rule 去实现,Checker 可能是后续引入的,用于替代 Rule 的 passCheck 行为。

Sentinel 中用来管理规则配置的类都以规则类的名称+Manger 命名,除此之外,并没有对规则管理器有什么行为上的约束。

用来加载限流规则配置以及缓存限流规则配置的类为 FlowRuleManager,其部分源码如下:

public class FlowRuleManager {

    // 缓存规则

    private static final Map<String, List<FlowRule>> flowRules = new ConcurrentHashMap<String, List<FlowRule>>();

    // 获取所有规则

    static Map<String, List<FlowRule>> getFlowRuleMap() {

        return flowRules;

    }

    // 更新规则

    public static void loadRules(List<FlowRule> rules) {

        // 更新静态字段 flowRules

    }

}
  • flowRules 静态字段:用于缓存规则配置,使用 ConcurrentMap 缓存,key 为资源的名称,value 是一个 FlowRule 数组。使用数组是因为 Sentinel 支持针对同一个资源配置多种限流规则,只要其中一个先达到限流的阈值就会触发限流。
  • loadRules:提供给使用者加载和更新规则的 API,该方法会将参数传递进来的规则数组转为 Map,然后先清空 flowRules 当前缓存的规则配置,再将新的规则配置写入 flowRules。
  • getFlowRuleMap:提供给 FlowSlot 获取配置的私有 API。

限流处理器插槽:FlowSlot

FlowSlot 是实现限流功能的切入点,它作为 ProcessorSlot 插入到 ProcessorSlotChain 链表中,在 entry 方法中调用 Checker 去判断是否需要拒绝当前请求,如果需要拒绝请求则抛出 Block 异常。FlowSlot 的源码如下:

public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    private final FlowRuleChecker checker;

    public FlowSlot() {

        this(new FlowRuleChecker());

    }
   // 规则生产者,一个 Function

    private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {

        // 参数为资源名称

        @Override

        public Collection<FlowRule> apply(String resource) {

            Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();

            return flowRules.get(resource);

        }

    };
    @Override

    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,

                      boolean prioritized, Object... args) throws Throwable {

        checkFlow(resourceWrapper, context, node, count, prioritized);

        fireEntry(context, resourceWrapper, node, count, prioritized, args);

    }

  // check 是否限流

    void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)

        throws BlockException {

        checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);

    }
    @Override

    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {

        fireExit(context, resourceWrapper, count, args);

    }

}

FlowSlot 在构造方法中创建 FlowRuleChecker,并在 entry 方法中调用 FlowRuleChecker#checkFlow 方法判断是否需要拦截当前请求。在调用 FlowRuleChecker#checkFlow 方法时传入了一个 Function 接口实例,FlowRuleChecker 可调用该 Function 的 apply 方法从 FlowRuleManager 获取资源的所有规则配置,当然,最终还是调用 FlowRuleManager#getFlowRuleMap 方法从 FlowRuleManager 获取。

限流规则检查器:FlowRuleChecker

FlowRuleChecker 与 FlowRuleManager 一样,Sentinel 也并没有约定 Checker 必须具有哪些行为,只是在命名上约定 Checker 类需以规则类的名称 + “Checker”命名。FlowRuleChecker 负责判断是否需要拒绝当前请求,由于 FlowRuleChecker 类的源码很多,所以我们按过程分析用到的每个方法。

首先是由 FlowSlot 调用的 checkFlow 方法,该方法源码如下:

public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,

                          Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {

        if (ruleProvider == null || resource == null) {

            return;

        }

        // (1)

        Collection<FlowRule> rules = ruleProvider.apply(resource.getName());

        if (rules != null) {

            // (2)

            for (FlowRule rule : rules) {

                // (3)

                if (!canPassCheck(rule, context, node, count, prioritized)) {

                    throw new FlowException(rule.getLimitApp(), rule);

                }

            }

        }

}

checkFlow 方法我们分三步分析:

  • 调用 FlowSlot 传递过来的 ruleProvider 的 apply 方法获取当前资源的所有限流规则;
  • 遍历限流规则,只要有一个限流规则达到限流阈值即可抛出 FlowException,使用 FlowException 目的是标志当前请求因为达到限流阈值被拒绝,FlowException 是 BlockException 的子类;
  • 调用 canPassCheck 方法判断当前请求是否允许通过。

canPassCheck 即“can pass check”,意思是检查是否允许通过,后面我们也统一将“检查是否允许当前请求通过”使用 canPassCheck 代指,canPassCheck 方法返回 true 说明允许请求通过,反之则不允许通过。canPassCheck 方法源码如下:

public boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,boolean prioritized) {

        // (1)

        String limitApp = rule.getLimitApp();

        if (limitApp == null) {

            return true;

        }

        // (2)

        if (rule.isClusterMode()) {

            return passClusterCheck(rule, context, node, acquireCount, prioritized);

        }

        // (3)

        return passLocalCheck(rule, context, node, acquireCount, prioritized);

}
  • 当前限流规则只对哪个调用来源生效,如果为 null 则返回 true,一般不为 null,默认为“default”(不限定调用来源);
  • 是否是集群限流模式,如果是集群限流模式则调用 passClusterCheck 方法完成 canPassCheck,我们暂时先不讨论集群限流的情况;
  • 非集群限流模式则调用 passLocalCheck 方法完成 canPassCheck。

passLocalCheck 方法源码如下:

private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,

                                          boolean prioritized) {

        // (1)

        Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);

        if (selectedNode == null) {

            return true;

        }

        // (2)

        return rule.getRater()

          // (3)

          .canPass(selectedNode, acquireCount, prioritized);

    }
  • 根据调用来源和“调用关系限流策略”选择 DefaultNode;
  • 获取限流规则配置的流量效果控制器(TrafficShapingController);
  • 调用 TrafficShapingController#canPass 方法完成 canPassCheck。

selectNodeByRequesterAndStrategy 方法的实现逻辑很复杂,实现根据限流规则配置的 limitApp 与 strategy 选择一个 StatisticNode,两个字段的组合情况可以有 6 种。selectNodeByRequesterAndStrategy 方法源码如下:

static Node selectNodeByRequesterAndStrategy(FlowRule rule, Context context, DefaultNode node) {

        // 限流规则针对哪个来源生效

        String limitApp = rule.getLimitApp();

        // 基于调用关系的限流策略

        int strategy = rule.getStrategy();

        // 远程来源

        String origin = context.getOrigin();

        if (limitApp.equals(origin) && filterOrigin(origin)) {

            if (strategy == RuleConstant.STRATEGY_DIRECT) {

                //(1)

                return context.getOriginNode();

            }

            //(2)

            return selectReferenceNode(rule, context, node);

        }

        else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {

            if (strategy == RuleConstant.STRATEGY_DIRECT) {

                //(3)

                return node.getClusterNode();

            }

            //(4)

            return selectReferenceNode(rule, context, node);

        }

        else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)

            && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {

            if (strategy == RuleConstant.STRATEGY_DIRECT) {

                 //(5)

                return context.getOriginNode();

            }

            //(6)

            return selectReferenceNode(rule, context, node);

        }

        return null;

}

如果当前限流规则的 limitApp 为 default,则说明该限流规则对任何调用来源都生效,针对所有调用来源限流,否则只针对指定调用来源限流。

\1. 如果调用来源与当前限流规则的 limitApp 相等,且 strategy 为 STRATEGY_DIRECT,则使用调用来源的 StatisticNode,实现针对调用来源限流。例如,当前服务名称为 demo-srv-b,请求调用来源为 demo-srv-a 服务,资源名称为“/hello”,那么 origin 的 StatisticNode 用于针对访问来源为 demo-srv-a 的“/hello”资源的指标数据统计。

\2. 前置条件与(1)相同,依然是针对来源限流。

  • strategy 为 STRATEGY_RELATE:根据限流规则配置的 refResource 获取引用资源的 ClusterNode,即使用引用资源的指标数据限流。通俗点说就是使用其它资源的指标数据限流,你的并发量高我就限流,让你多处理一点请求,等你并发量降低了,我就不限流了;
  • strategy 为 STRATEGY_CHAIN:使用当前资源的 DefauleNode,实现按调用链路的资源指标数据限流。

\3. 当 limitApp 为 default 时,针对所有来源限流。如果 strategy 为 STRATEGY_DIRECT,则使用当前资源的 ClusterNode。

\4. 前置条件与(3)相同,依然是针对所有来源限流。

  • strategy 为 STRATEGY_RELATE:使用引用资源的 ClusterNode;
  • strategy 为 STRATEGY_CHAIN:使用当前资源的 DefauleNode。

\5. 如果 limitApp 为 other,且该资源的所有限流规则都没有针对当前的调用来源限流。如果 strategy 为 STRATEGY_DIRECT,则使用 origin 的 StatisticNode。

\6. 前置条件与(5)一样。

  • strategy 为 STRATEGY_RELATE:使用引用资源的 ClusterNode;
  • strategy 为 STRATEGY_CHAIN:使用当前资源的 DefauleNode。

从 selectNodeByRequesterAndStrategy 方法可以看出,Sentinel 之所以针对每个资源统计访问来源的指标数据,也是为了实现对丰富的限流策略的支持。

因为每个调用来源服务对同一个资源的访问频率都是不同的,针对调用来源限流可限制并发量较高的来源服务的请求,而对并发量低的来源服务的请求可不限流,或者是对一些并没有那么重要的来源服务限流。

当两个资源之间具有资源争抢关系的时候,使用 STRATEGY_RELATE 调用关系限流策略可避免多个资源之间过度的对同一资源争抢。例如查询订单信息和用户下单两个分别读和写数据库订单表的资源,如下图所示。

10-02-基于调用关系限流

我们可以给执行读表操作的资源设置限流规则实现写优先的目的,查询订单信息的资源根据用户下单的资源的实时指标数据限流,当写表操作过于频繁时,读表操作的请求就会被限流。